package com.qsl.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * @author: 青石路
 */
@Component
public class KafkaSender {

    private static final Logger log = LoggerFactory.getLogger(KafkaSender.class);

    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String msg) {
        if (kafkaTemplate == null) {
            log.warn("未启用secondKafka，不发送消息");
            return;
        }
        kafkaTemplate.send(topic, msg).addCallback(
                success -> {
                    if (success != null) {
                        log.info("消息发送成功: Topic={}, Partition={}, Offset={}",
                                success.getRecordMetadata().topic(), success.getRecordMetadata().partition(), success.getRecordMetadata().offset());
                    }
                },
                failure -> {
                    log.error("消息发送失败：", failure.getCause());
                }
        );
    }

    @Autowired(required = false)
    @Qualifier("secondKafkaTemplate")
    public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
}
